import $ivy.`org.apache.spark::spark-sql:3.0.1`
import $ivy.`org.apache.spark::spark-mllib:3.0.1`
import $ivy.`org.plotly-scala::plotly-almond:0.7.6`
import $ivy.$ import $ivy.$ import $ivy.$
val currentDirectory = new java.io.File(".").getCanonicalPath
val path = java.nio.file.FileSystems.getDefault().getPath(s"$currentDirectory/lib/TDM-assembly-0.3.0.jar")
val x = ammonite.ops.Path(path)
interp.load.cp(x)
currentDirectory: String = "/data/gillet/notebooks/TDM" path: java.nio.file.Path = /data/gillet/notebooks/TDM/lib/TDM-assembly-0.3.0.jar x: os.Path = root/'data/'gillet/'notebooks/'TDM/'lib/"TDM-assembly-0.3.0.jar"
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import tdm._
import tdm.core._
import tdm.core.decomposition.Norm
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.functions._ import tdm._ import tdm.core._ import tdm.core.decomposition.Norm
implicit val spark = {
SparkSession.builder()
.appName("DARPA1998")
.master("local[*]")
.getOrCreate()
}
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 21/05/08 17:11:11 INFO SparkContext: Running Spark version 3.0.1 21/05/08 17:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/05/08 17:11:11 INFO ResourceUtils: ============================================================== 21/05/08 17:11:11 INFO ResourceUtils: Resources for spark.driver: 21/05/08 17:11:11 INFO ResourceUtils: ============================================================== 21/05/08 17:11:11 INFO SparkContext: Submitted application: DARPA1998 21/05/08 17:11:11 INFO SecurityManager: Changing view acls to: gillet 21/05/08 17:11:11 INFO SecurityManager: Changing modify acls to: gillet 21/05/08 17:11:11 INFO SecurityManager: Changing view acls groups to: 21/05/08 17:11:11 INFO SecurityManager: Changing modify acls groups to: 21/05/08 17:11:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(gillet); groups with view permissions: Set(); users with modify permissions: Set(gillet); groups with modify permissions: Set() 21/05/08 17:11:12 INFO Utils: Successfully started service 'sparkDriver' on port 43017. 21/05/08 17:11:12 INFO SparkEnv: Registering MapOutputTracker 21/05/08 17:11:12 INFO SparkEnv: Registering BlockManagerMaster 21/05/08 17:11:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 21/05/08 17:11:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 21/05/08 17:11:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 21/05/08 17:11:12 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7755d8ba-d244-4fb0-ad02-5e517f09f276 21/05/08 17:11:12 INFO MemoryStore: MemoryStore started with capacity 17.8 GiB 21/05/08 17:11:12 INFO SparkEnv: Registering OutputCommitCoordinator 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046. 21/05/08 17:11:12 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047. 21/05/08 17:11:12 INFO Utils: Successfully started service 'SparkUI' on port 4047. 21/05/08 17:11:12 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://cocktail-jboss.u-bourgogne.fr:4047 21/05/08 17:11:12 INFO Executor: Starting executor ID driver on host cocktail-jboss.u-bourgogne.fr 21/05/08 17:11:12 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35829. 21/05/08 17:11:12 INFO NettyBlockTransferService: Server created on cocktail-jboss.u-bourgogne.fr:35829 21/05/08 17:11:12 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 21/05/08 17:11:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, cocktail-jboss.u-bourgogne.fr, 35829, None) 21/05/08 17:11:13 INFO BlockManagerMasterEndpoint: Registering block manager cocktail-jboss.u-bourgogne.fr:35829 with 17.8 GiB RAM, BlockManagerId(driver, cocktail-jboss.u-bourgogne.fr, 35829, None) 21/05/08 17:11:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, cocktail-jboss.u-bourgogne.fr, 35829, None) 21/05/08 17:11:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, cocktail-jboss.u-bourgogne.fr, 35829, None)
spark: SparkSession = org.apache.spark.sql.SparkSession@449557e0 import spark.implicits._
var darpa = spark.read.format("csv").option("header","false")
.option("sep", "\t").load("datasets/DARPA1998/1998DARPA.tensor")
.toDF("source_id", "destination_id", "time_id", "val")
.withColumn("val", col("val").cast("int"))
val timeDic = spark.read.format("csv").option("header","false")
.option("sep", "\t").load("datasets/DARPA1998/time_dic")
.toDF("hour", "day", "time_id")
.withColumn("time", to_timestamp(concat(col("day"), lit(" "), col("hour")), "MM/dd/yyyy HH:mm:ss.SSS"))
.drop("day").drop("hour")
val ipDic = spark.read.format("csv").option("header","false")
.option("sep", "\t").load("datasets/DARPA1998/ip_dic")
.toDF("ip", "ip_id")
darpa: DataFrame = [val: bigint, time: bigint ... 2 more fields]
timeDic: DataFrame = [time_id: string, time: timestamp]
ipDic: DataFrame = [ip: string, ip_id: string]
// Get real values of the time dimension
darpa = darpa.join(timeDic, "time_id")
.drop("time_id")
.groupBy(col("source_id"), col("destination_id"), window(col("time"), "1 minutes")).sum("val")
.withColumnRenamed("sum(val)", "val")
.withColumn("time", col("window.start").cast("long"))
.drop("window")
// Get real values of the source dimension
darpa = darpa.join(ipDic, darpa.col("source_id") === ipDic.col("ip_id"))
.drop("source_id", "ip_id")
.withColumnRenamed("ip", "source")
// Get real values of the destination dimension
darpa = darpa.join(ipDic, darpa.col("destination_id") === ipDic.col("ip_id"))
.drop("destination_id", "ip_id")
.withColumnRenamed("ip", "destination")
object Source extends TensorDimension[String]
object Destination extends TensorDimension[String]
object Time extends TensorDimension[Long]
defined object Source defined object Destination defined object Time
val tensor = TensorBuilder[Long](darpa)
.addDimension(Source, "source")
.addDimension(Destination, "destination")
.addDimension(Time, "time")
.build("val")
tensor: Tensor[Long, shapeless.::[Time.type, shapeless.::[Destination.type, shapeless.::[Source.type, shapeless.HNil]]]] = tdm.core.Tensor@1b039ea5
val kruskal = tensor.canonicalPolyadicDecomposition(10, norm = Norm.L2, computeCorcondia = true)
iteration 1 CP in 72.265s iteration 2 FMS: 0.1787239507237433 in 0.561s CP in 10.585s iteration 3 FMS: 0.8275266998510629 in 0.564s CP in 10.444s iteration 4 FMS: 0.9546349640908082 in 0.555s CP in 10.505s iteration 5 FMS: 0.9738888576682635 in 0.568s CP in 10.432s iteration 6 FMS: 0.9784436957558198 in 0.585s CP in 10.805s iteration 7 FMS: 0.9796436082370737 in 0.556s CP in 10.349s iteration 8 FMS: 0.9809194392104438 in 0.547s CP in 10.536s iteration 9 FMS: 0.9827444768836701 in 0.547s CP in 10.271s iteration 10 FMS: 0.9809896222713489 in 0.548s CP in 10.342s iteration 11 FMS: 0.9626553768281223 in 0.572s CP in 10.428s iteration 12 FMS: 0.8988240715529351 in 0.54s CP in 10.588s iteration 13 FMS: 0.9014089589114528 in 0.575s CP in 10.739s iteration 14 FMS: 0.980285777736786 in 0.539s CP in 10.809s iteration 15 FMS: 0.9926585881705703 in 0.574s CP in 10.621s CORCONDIA: Some(-1.335885203423836E13)
kruskal: KruskalTensor[shapeless.::[Time.type, shapeless.::[Destination.type, shapeless.::[Source.type, shapeless.HNil]]]] = tdm.core.KruskalTensor@51ce8dd6
val sourceTensor = kruskal.extract(Source)
val destinationTensor = kruskal.extract(Destination)
val timeTensor = kruskal.extract(Time)
sourceTensor: Tensor[Double, shapeless.::[Source.type, shapeless.::[Rank.type, shapeless.HNil]]] = tdm.core.Tensor@7fd33f6c destinationTensor: Tensor[Double, shapeless.::[Destination.type, shapeless.::[Rank.type, shapeless.HNil]]] = tdm.core.Tensor@27ad27f9 timeTensor: Tensor[Double, shapeless.::[Time.type, shapeless.::[Rank.type, shapeless.HNil]]] = tdm.core.Tensor@61ac9eac
import plotly._
import plotly.element._
import plotly.layout._
import plotly.Almond._
init(offline=true)
import plotly._ import plotly.element._ import plotly.layout._ import plotly.Almond._
val nbRanks = kruskal.lambdas.size
for (rank <- 0 until nbRanks) {
val sourceCollected = {
val sources = sourceTensor.projection(Rank)(rank)
.selection(v => math.abs(v) > 0.01).collect.orderByValuesDesc
(for (i <- sources.indices) yield {
(sources(Source, i), math.abs(sources(i)))
}).toList.sortWith((d1, d2) => d1._2 > d2._2)
}
val destinationCollected = {
val destinations = destinationTensor.projection(Rank)(rank)
.selection(v => math.abs(v) > 0.01).collect.orderByValuesDesc
(for (i <- destinations.indices) yield {
(destinations(Destination, i), math.abs(destinations(i)))
}).toList.sortWith((d1, d2) => d1._2 > d2._2)
}
val time = {
val times = timeTensor.projection(Rank)(rank).collect
(for (i <- times.indices) yield {
(times(Time, i), times(i))
}).toList.sortWith((d1, d2) => d1._1 < d2._1)
}
val nbMinutes = 1
var plot = Seq(
Scatter(
time.map(v => {
val formatter = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
formatter.format(new java.util.Date(v._1 * 1000)).toString
}).toSeq,
time.map(v => math.abs(v._2)).toSeq,
name = "Time",
xaxis = AxisReference.X1,
yaxis = AxisReference.Y1
),
Bar(
sourceCollected.map(v => v._1),
sourceCollected.map(v => v._2),
name = "Source",
xaxis = AxisReference.X2,
yaxis = AxisReference.Y2
),
Bar(
destinationCollected.map(v => v._1),
destinationCollected.map(v => v._2),
name = "Destination",
xaxis = AxisReference.X3,
yaxis = AxisReference.Y3
)
)
val layout = Layout(
title = s"Rank $rank",
width = 1000,
xaxis1 = Axis(anchor = AxisAnchor.Reference(AxisReference.Y1), domain = (0.0, 1.0), automargin = true),
xaxis2 = Axis(anchor = AxisAnchor.Reference(AxisReference.Y2), domain = (0.0, 0.49), automargin = true),
xaxis3 = Axis(anchor = AxisAnchor.Reference(AxisReference.Y3), domain = (0.51, 1.0), automargin = true),
yaxis1 = Axis(anchor = AxisAnchor.Reference(AxisReference.X1), domain = (0.55, 1.0), automargin = true),
yaxis2 = Axis(anchor = AxisAnchor.Reference(AxisReference.X2), domain = (0.0, 0.45), automargin = true),
yaxis3 = Axis(anchor = AxisAnchor.Reference(AxisReference.X3), domain = (0.0, 0.45), automargin = true),
legend = Legend(y = 1.1, x = .5, yanchor = Anchor.Top, xanchor = Anchor.Center, orientation = Orientation.Horizontal)
)
plot.plot(layout = layout, Config(), "")
}
nbRanks: Int = 10